package server

import (
	"context"
	"fmt"
	"github.com/IBM/sarama"
	"kratos_kafka/internal/conf"
	"kratos_kafka/internal/service"
	"os"
	"os/signal"
	"slices"
)

// Notice 要消费分区的数据
var consumePartitions = []int32{2, 3}

type SaramaPartitions struct {
	// 将GreeterService放进去方便业务处理～
	greeterService *service.GreeterService
}

func NewSaramaPartitionsConsumer(greeterService *service.GreeterService) *SaramaPartitions {
	return &SaramaPartitions{
		greeterService: greeterService,
	}
}

// 详细使用参考：https://juejin.cn/post/6999263126713696293#heading-4
func (s *SaramaPartitions) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// 轮询消息...
	for msg := range claim.Messages() {

		// Notice 去掉不消费分区的数据
		if !slices.Contains(consumePartitions, msg.Partition) {
			fmt.Printf("不是要消费分区的数据！partition: %v, offset: %v \n", msg.Partition, msg.Offset)

			// Notice 记得MarkMessage！不这样做的话，项目下一次重启后，还会从上一次没有mark的消息开始消费！！！！！
			session.MarkMessage(msg, "不是要消费分区的数据")

			continue
		}

		fmt.Printf("从article_Partitions收到了消息消息>>>: partition: %v, offset: %v, value: %v, mgsTimestamp: %v, \n", msg.Partition, msg.Offset, string(msg.Value), msg.Timestamp)

		// TODO 需要业务处理的话 先json解析一下.....

		// Notice MarkMessage！
		session.MarkMessage(msg, "")

	}

	return nil
}

func (s *SaramaPartitions) Setup(sarama.ConsumerGroupSession) error {
	return nil
}

func (s *SaramaPartitions) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// 消费 article_Partitions中的消息
func NewSaramaKafkaArticlePartitionsConsumerServer(bootConf *conf.Bootstrap, greeterService *service.GreeterService) *sarama.ConsumerGroup {

	ctx, cancel := context.WithCancel(context.Background())

	// Notice 如何使用TLS连接kafka，参考 tests/sarama_kafka_tls 中的代码～

	// NewConfig的时候初始化了各种配置～
	saramaPartitionsConsumerConfig := sarama.NewConfig()

	// Notice 自动提交模式下已经能解决丢消息问题,没有特殊需求不需要手动提交
	// 手动提交消息参考下面的文章: https://juejin.cn/post/6999263126713696293#heading-5
	saramaPartitionsConsumerConfig.Consumer.Offsets.AutoCommit.Enable = true

	// Notice new consumer group
	saramaPartitionsConsumerGroup, errSaramaPartitionsConsumerGroup := sarama.NewConsumerGroup(
		// addr
		bootConf.MessageQueue.Kafka.Server.Addr,
		// group
		"my-group",
		saramaPartitionsConsumerConfig,
	)
	if errSaramaPartitionsConsumerGroup != nil {
		panic(errSaramaPartitionsConsumerGroup)
	}

	// 监听信号，用于优雅的停止消费者 Notice 防止 goroutine泄漏！
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt, os.Kill)

	// 使用自己定义的 RealConsumer 消费消息 重写 ConsumeClaim 方法处理消费的数据
	// 将greeterService当作参数传进去～
	currSaramaPartitionConsumer := NewSaramaPartitionsConsumer(greeterService)

	// 消费数据
	go func() {
		for {
			select {
			case err := <-saramaPartitionsConsumerGroup.Errors():
				if err != nil {
					fmt.Println("saramaPartitionsConsumerGroup.Errors():>> ", err)
				}
			// TODO ... 最好写一个 clean函数 将取消信号加进去～～
			case <-signals:
				printStopSignal2()

				saramaPartitionsConsumerGroup.Close()
				cancel()
				return
			default:
				errConsume := saramaPartitionsConsumerGroup.Consume(
					ctx,
					// topic
					[]string{"my-topic"},
					// Notice 重写了ConsumeClaim方法，业务中主要的处理逻辑写在这个方法里面
					currSaramaPartitionConsumer,
				)
				if errConsume != nil {
					fmt.Println("saramaPartitions消费消息发生错误！err: ", errConsume)
				}
			}
		}
	}()

	return &saramaPartitionsConsumerGroup
}

//func printMsgTest2(msg string) {
//
//	fmt.Println("printMsgTest..............", msg)
//
//}
//
//func printStopSignal2() {
//	fmt.Println("收到了停止的信号!!! 停止saramaPartitionsConsumer...")
//}
